收发事务消息

本文提供使用 TCP 协议下的 Java SDK 收发事务消息的示例代码供您参考。

消息队列提供类似 X/Open XA 的分布式事务功能,通过消息队列事务消息,能达到分布式事务的最终一致。

说明

对于新手用户,建议在正式收发消息前,阅读 Demo 工程来了解搭建消息队列工程的具体步骤。

交互流程

事务消息交互流程如下图所示。

事务消息交互流程

详情请参见 消息类型 > 事务消息

前提条件

您已完成以下操作:

发送事务消息

说明

具体的示例代码,请以消息队列代码库为准。

发送事务消息包含以下两个步骤:

  1. 发送半事务消息(Half Message)及执行本地事务,示例代码如下。

    import java.util.Properties;
    import com.alipay.sofa.sofamq.client.PropertyKeyConst;
    import io.openmessaging.api.Message;
    import io.openmessaging.api.MessagingAccessPoint;
    import io.openmessaging.api.OMS;
    import io.openmessaging.api.OMSBuiltinKeys;
    import io.openmessaging.api.transaction.LocalTransactionChecker;
    import io.openmessaging.api.transaction.LocalTransactionExecuter;
    import io.openmessaging.api.transaction.TransactionProducer;
    import io.openmessaging.api.transaction.TransactionStatus;
    
    public class TransactionProducerTest {
        public static void main(String... args) {
            Properties credentials = new Properties();
            // 阿里云账号 AccessKey 拥有所有 API 的访问权限,风险很高。强烈建议您创建并使用 RAM 用户进行 API 访问或日常运维,请登录 RAM 控制台创建 RAM 用户。
            // 此处以把 AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
            // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
            credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV");        
            credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV");
    
            // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
            MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint").withCredentials(credentials).build();
    
            Properties properties = new Properties();
            // 设置用户实例,进入控制台的概览页面查看接入点配置
            properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
            properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
    
            TransactionProducer producer = accessPoint.createTransactionProducer(properties, newLocalTransactionChecker() {
                @Override
                public TransactionStatus check (Message msg){
                    // check business commit status
                    return TransactionStatus.CommitTransaction;
                }
            });
            producer.start();
    
            Message message = new Message("$topic", "YOUR_TAG", "hello world".getBytes());
            producer.send(message, new LocalTransactionExecuter() {
                @Override
                public TransactionStatus execute(Message msg, Object arg) {
                    // if business success, then commit; else rollback
                    return TransactionStatus.CommitTransaction;
                }
            }, null);
        }
    }
  2. 提交事务消息状态。当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。通知方式有以下两种:

    • 执行本地事务完成后提交。

    • 执行本地事务一直没提交状态,等待服务器回查消息的事务状态。事务状态有以下三种:

      • TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息。

      • TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费。

      • TransactionStatus.Unknow 无法判断状态,期待消息队列的 Broker 向发送方再次询问该消息对应的本地事务的状态。

事务回查机制说明

  • 发送事务消息为什么必须要实现回查 Check 机制?当步骤 1 中半事务消息发送完成,但本地事务返回状态为 TransactionStatus.Unknow,或者应用退出导致本地事务未提交任何状态时,从 Broker 的角度看,这条 Half 状态的消息的状态是未知的。因此 Broker 会定期要求发送方能 Check 该 Half 状态消息,并上报其最终状态。

  • Check 被回调时,业务逻辑都需要做些什么?事务消息的 Check 方法里面,应该写一些检查事务一致性的逻辑。消息队列发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 Broker 主动发起的本地事务状态回查请求;因此在事务消息的 Check 方法中,需要完成两件事情:

    1. 检查该半事务消息对应的本地事务的状态(committed or rollback)。

    2. 向 Broker 提交该半事务消息本地事务的状态。

订阅事务消息

事务消息的订阅与普通消息订阅一致,详情请参见 订阅消息